
// author: niXman
// e-mail: i.nixman@gmail.com
// date: 28.05.2011
//
//
// Boost Software License - Version 1.0 - August 17th, 2003
//
// Permission is hereby granted, free of charge, to any person or organization
// obtaining a copy of the software and accompanying documentation covered by
// this license (the "Software") to use, reproduce, display, distribute,
// execute, and transmit the Software, and to prepare derivative works of the
// Software, and to permit third-parties to whom the Software is furnished to
// do so, all subject to the following:
//
// The copyright notices in the Software and this entire statement, including
// the above license grant, this restriction and the following disclaimer,
// must be included in all copies of the Software, in whole or in part, and
// all derivative works of the Software, unless such copies or derivative
// works are solely in the form of machine-executable object code generated by
// a source language processor.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

#ifndef _netclasses_socket_hpp_included_
#define _netclasses_socket_hpp_included_

#include <deque>
#include <stdexcept>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/shared_array.hpp>

namespace netclasses {

/***************************************************************************/

struct socket: private boost::noncopyable {
   explicit socket(boost::asio::io_service& ios)
      :sock(ios)
   {}

   ~socket() {
      boost::system::error_code ec;
      disconnect(ec);
   }

   boost::asio::ip::tcp::socket& get_socket() { return sock; }

   void connect(const std::string& ip, boost::uint16_t port) {
      sock.connect(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port));
   }
   void connect(const std::string& ip, boost::uint16_t port, boost::system::error_code& e) {
      sock.connect(boost::asio::ip::tcp::endpoint(boost::asio::ip::address::from_string(ip), port), e);
   }

   void disconnect() {
      reset();
      if ( is_open() ) {
         sock.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
         sock.close();
      }
   }
   void disconnect(boost::system::error_code& e) {
      reset(e);
      if ( e ) return;
      if ( is_open() ) {
         sock.shutdown(boost::asio::ip::tcp::socket::shutdown_both, e);
         if ( e ) return;
         sock.close(e);
      }
   }

   size_t available() const { return sock.available(); }
   size_t available(boost::system::error_code& e) const { return sock.available(e); }

   bool is_open() const { return sock.is_open(); }

   void cancel() { sock.cancel(); }
   void cancel(boost::system::error_code& e) { sock.cancel(e); }

   void close() { sock.close(); }
   void close(boost::system::error_code& e) { sock.close(e); }

   void reset() {
      clear_read_queue();
      clear_write_queue();
      cancel();
   }
   void reset(boost::system::error_code& e) {
      clear_read_queue();
      clear_write_queue();
      cancel(e);
   }

   size_t read_queue_size() const {
      boost::mutex::scoped_lock locker(read_mutex);
      return read_queue.size();
   }
   size_t write_queue_size() const {
      boost::mutex::scoped_lock locker(write_mutex);
      return write_queue.size();
   }

   void clear_read_queue() {
      boost::mutex::scoped_lock locker(read_mutex);
      read_queue.clear();
   }
   void clear_write_queue() {
      boost::mutex::scoped_lock locker(write_mutex);
      write_queue.clear();
   }

   size_t write(const void* ptr, size_t size) {
      return boost::asio::write(sock, boost::asio::buffer(ptr, size));
   }
   size_t write(const void* ptr, size_t size, boost::system::error_code& e) {
#if BOOST_ASIO_VERSION >= 100503
      return boost::asio::write(sock, boost::asio::buffer(ptr, size), e);
#else
      return boost::asio::write(sock, boost::asio::buffer(ptr, size), boost::asio::transfer_all(), e);
#endif
   }
   size_t write(const std::string& str) {
      return write(str.data(), str.size());
   }
   size_t write(const std::string& str, boost::system::error_code& e) {
      return write(str.data(), str.size(), e);
   }

   template<typename F>
   void async_write(boost::shared_array<char> arr, size_t size, F handler) {
      boost::mutex::scoped_lock locker(write_mutex);

      /**  */
      if ( !sock.is_open() ) { handler(boost::system::error_code(boost::asio::error::not_connected), arr, size); return; }
      if ( !arr.get() || !size ) { handler(boost::system::error_code(boost::asio::error::invalid_argument), arr, size); return; }

      /**  */
      queue_item_ptr item(new queue_item_ptr::element_type);
      item->handler = handler;
      item->buf = arr;
      item->size = size;

      /**  */
      write_queue.push_back(item);
      if ( write_queue.size() == 1 ) {
         locker.unlock();
         write_exec();
      }
   }
   void async_write(boost::shared_array<char> arr, size_t size, void(*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_write(arr, size, boost::bind(handler, _1, _2, _3));
   }
   template<typename Obj>
   void async_write(boost::shared_array<char> arr, size_t size, Obj* obj, void(Obj::*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_write(arr, size, boost::bind(handler, obj, _1, _2, _3));
   }
   template<typename Obj>
   void async_write(boost::shared_array<char> arr, size_t size, boost::shared_ptr<Obj> obj, void(Obj::*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_write(arr, size, boost::bind(handler, obj, _1, _2, _3));
   }
   void async_write(const std::string& str, void(*handler)(const boost::system::error_code&)) {
      const size_t size = str.size();
      boost::shared_array<char> arr(new char[size]);
      memcpy(arr.get(), str.c_str(), size);
      async_write(arr, size, boost::bind(handler, _1));
   }
   template<typename Obj>
   void async_write(const std::string& str, Obj* obj, void(Obj::*handler)(const boost::system::error_code&)) {
      const size_t size = str.size();
      boost::shared_array<char> arr(new char[size]);
      memcpy(arr.get(), str.c_str(), size);
      async_write(arr, size, boost::bind(handler, obj, _1));
   }
   template<typename Obj>
   void async_write(const std::string& str, boost::shared_ptr<Obj> obj, void(Obj::*handler)(const boost::system::error_code&)) {
      const size_t size = str.size();
      boost::shared_array<char> arr(new char[size]);
      memcpy(arr.get(), str.c_str(), size);
      async_write(arr, size, boost::bind(handler, obj, _1));
   }

   size_t read(void* ptr, size_t size) {
      return boost::asio::read(sock, boost::asio::buffer(ptr, size));
   }
   size_t read(void* ptr, size_t size, boost::system::error_code& e) {
#if BOOST_ASIO_VERSION >= 100503
      return boost::asio::read(sock, boost::asio::buffer(ptr, size), e);
#else
      return boost::asio::read(sock, boost::asio::buffer(ptr, size), boost::asio::transfer_all(), e);
#endif
   }
   size_t read(std::string& str, size_t size) {
      return read(&str[0], size);
   }
   size_t read(std::string& str, size_t size, boost::system::error_code& e) {
      return read(&str[0], size, e);
   }

   template<typename F>
   void async_read(size_t size, F handler) {
      boost::mutex::scoped_lock locker(read_mutex);

      /**  */
      if ( !sock.is_open() ) { handler(boost::system::error_code(boost::asio::error::not_connected), boost::shared_array<char>(), 0); return; }
      if ( !size ) { handler(boost::system::error_code(boost::asio::error::invalid_argument), boost::shared_array<char>(), 0); return; }

      /**  */
      queue_item_ptr item(new queue_item_ptr::element_type);
      item->handler = handler;
      item->size = size;

      /**  */
      read_queue.push_back(item);
      if ( read_queue.size() == 1 ) {
         locker.unlock();
         read_exec();
      }
   }
   void async_read(size_t size, void(*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_read(size, boost::bind(handler, _1, _2, _3));
   }
   template<typename Obj>
   void async_read(size_t size, Obj* obj, void(Obj::*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_read(size, boost::bind(handler, obj, _1, _2, _3));
   }
   template<typename Obj>
   void async_read(size_t size, boost::shared_ptr<Obj> obj, void(Obj::*handler)(const boost::system::error_code&, boost::shared_array<char>, size_t)) {
      async_read(size, boost::bind(handler, obj, _1, _2, _3));
   }

private:
   typedef boost::function<
      void(const boost::system::error_code&, boost::shared_array<char>, size_t)
   > handler_type;

   struct queue_item {
      handler_type handler;
      boost::shared_array<char> buf;
      size_t size;
   };
   typedef boost::shared_ptr<queue_item> queue_item_ptr;

   void write_exec() {
      queue_item_ptr item;
      {  boost::mutex::scoped_lock locker(write_mutex);
         item = write_queue.front();
      }

      boost::asio::async_write(
         sock,
         boost::asio::buffer(item->buf.get(), item->size),
         boost::bind(
            &socket::write_exec_handler,
            this,
            item,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred
         )
      );
   }

   void write_exec_handler(queue_item_ptr item, const boost::system::error_code& e, size_t wr) {
      item->handler(e, item->buf, wr);

      {  boost::mutex::scoped_lock locker(write_mutex);
         write_queue.pop_front();
         if ( !write_queue.empty() ) {
            write_mutex.unlock();
            write_exec();
         }
      }
   }

   void read_exec() {
      queue_item_ptr item;
      {  boost::mutex::scoped_lock locker(read_mutex);
         item = read_queue.front();
         item->buf.reset(new char[item->size]);
      }

      boost::asio::async_read(
         sock,
         boost::asio::buffer(item->buf.get(), item->size),
         boost::bind(
            &socket::read_exec_handler,
            this,
            item,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred
         )
      );
   }

   void read_exec_handler(queue_item_ptr item, const boost::system::error_code& e, size_t rd) {
      item->handler(e, item->buf, rd);

      {  boost::mutex::scoped_lock locker(read_mutex);
         read_queue.pop_front();
         if ( !read_queue.empty() ) {
            read_mutex.unlock();
            read_exec();
         }
      }
   }

private:
   boost::asio::ip::tcp::socket sock;

   mutable boost::mutex read_mutex;
   std::deque<queue_item_ptr> write_queue;

   mutable boost::mutex write_mutex;
   std::deque<queue_item_ptr> read_queue;
};

typedef boost::shared_ptr<socket> socket_ptr;

/***************************************************************************/

} // namespace netclasses

#endif // _netclasses_socket_hpp_included_
